{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# AdventureWorks - Medium"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4040\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@16ca0364"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app14-2\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@100b49f8"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mcust_aw\u001b[39m: \u001b[32mDataFrame\u001b[39m = [customerid: int, namestyle: string ... 11 more fields]\n",
       "\u001b[36mcust_addr\u001b[39m: \u001b[32mDataFrame\u001b[39m = [customerid: int, addressid: int ... 1 more field]\n",
       "\u001b[36maddr\u001b[39m: \u001b[32mDataFrame\u001b[39m = [addressid: int, addressline1: string ... 5 more fields]\n",
       "\u001b[36mproduct\u001b[39m: \u001b[32mDataFrame\u001b[39m = [productid: int, name: string ... 12 more fields]\n",
       "\u001b[36morder_det\u001b[39m: \u001b[32mDataFrame\u001b[39m = [salesorderid: int, salesorderdetailid: int ... 4 more fields]\n",
       "\u001b[36morder_head\u001b[39m: \u001b[32mDataFrame\u001b[39m = [salesorderid: int, revisionnumber: int ... 17 more fields]\n",
       "\u001b[36mprod_model\u001b[39m: \u001b[32mDataFrame\u001b[39m = [productmodelid: int, name: string ... 1 more field]\n",
       "\u001b[36mprod_model_prod\u001b[39m: \u001b[32mDataFrame\u001b[39m = [productmodelid: int, productdescriptionid: int ... 1 more field]\n",
       "\u001b[36mprod_desc\u001b[39m: \u001b[32mDataFrame\u001b[39m = [productdescriptionid: int, description: string]\n",
       "\u001b[36mprod_cat\u001b[39m: \u001b[32mDataFrame\u001b[39m = [productcategoryid: int, parentproductcategoryid: int ... 1 more field]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val cust_aw = hiveCxt.table(\"sqlzoo.CustomerAW\")\n",
    "val cust_addr = hiveCxt.table(\"sqlzoo.CustomerAddress\")\n",
    "val addr = hiveCxt.table(\"sqlzoo.Address\")\n",
    "val product = hiveCxt.table(\"sqlzoo.Product\")\n",
    "val order_det = hiveCxt.table(\"sqlzoo.SalesOrderDetail\")\n",
    "val order_head = hiveCxt.table(\"sqlzoo.SalesOrderHeader\")\n",
    "val prod_model = hiveCxt.table(\"sqlzoo.ProductModel\")\n",
    "val prod_model_prod = hiveCxt.table(\"sqlzoo.ProductModelProductDescription\")\n",
    "val prod_desc = hiveCxt.table(\"sqlzoo.ProductDescription\")\n",
    "val prod_cat = hiveCxt.table(\"sqlzoo.ProductCategory\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6.\n",
    "A \"Single Item Order\" is a customer order where only one item is ordered. Show the SalesOrderID and the UnitPrice for every Single Item Order."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>SalesOrderID</th><th>UnitPrice</th>\n",
       "                </tr>\n",
       "                <tr><td>71936</td><td>20.99</td></tr><tr><td>71936</td><td>149.87</td></tr><tr><td>71936</td><td>158.43</td></tr><tr><td>71832</td><td>158.43</td></tr><tr><td>71796</td><td>31.58</td></tr><tr><td>71815</td><td>202.33</td></tr><tr><td>71858</td><td>54.89</td></tr><tr><td>71858</td><td>12.14</td></tr><tr><td>71935</td><td>24.29</td></tr><tr><td>71898</td><td>602.35</td></tr><tr><td>71856</td><td>54.89</td></tr><tr><td>71832</td><td>37.15</td></tr><tr><td>71832</td><td>818.7</td></tr><tr><td>71856</td><td>445.41</td></tr><tr><td>71784</td><td>200.05</td></tr><tr><td>71797</td><td>202.33</td></tr><tr><td>71846</td><td>818.7</td></tr><tr><td>71920</td><td>72.0</td></tr><tr><td>71832</td><td>31.58</td></tr><tr><td>71867</td><td>858.9</td></tr><tr><td>71899</td><td>48.59</td></tr><tr><td>71782</td><td>31.58</td></tr><tr><td>71902</td><td>31.58</td></tr><tr><td>71923</td><td>2.99</td></tr><tr><td>71832</td><td>48.59</td></tr><tr><td>71816</td><td>1430.44</td></tr><tr><td>71784</td><td>602.35</td></tr><tr><td>71902</td><td>72.16</td></tr><tr><td>71832</td><td>16.27</td></tr><tr><td>71846</td><td>113.0</td></tr><tr><td>71936</td><td>113.0</td></tr><tr><td>71902</td><td>218.45</td></tr><tr><td>71797</td><td>38.1</td></tr><tr><td>71796</td><td>54.94</td></tr><tr><td>71780</td><td>32.99</td></tr><tr><td>71796</td><td>728.91</td></tr><tr><td>71831</td><td>1376.99</td></tr><tr><td>71782</td><td>200.05</td></tr><tr><td>71898</td><td>12.14</td></tr><tr><td>71858</td><td>4.77</td></tr><tr><td>71863</td><td>1020.59</td></tr><tr><td>71863</td><td>20.99</td></tr><tr><td>71897</td><td>1430.44</td></tr><tr><td>71780</td><td>48.59</td></tr><tr><td>71796</td><td>1430.44</td></tr><tr><td>71845</td><td>818.7</td></tr><tr><td>71782</td><td>63.9</td></tr><tr><td>71783</td><td>72.16</td></tr><tr><td>71832</td><td>149.87</td></tr><tr><td>71902</td><td>37.25</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(order_det.filter(col(\"OrderQty\")===1)\n",
    " .groupBy(\"SalesOrderID\", \"UnitPrice\")\n",
    " .agg(count(\"SalesOrderDetailID\").alias(\"n\"))\n",
    " .filter(col(\"n\")===1)\n",
    " .select(\"SalesOrderID\", \"UnitPrice\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7.\n",
    "Where did the racing socks go? List the product name and the CompanyName for all Customers who ordered ProductModel 'Racing Socks'."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>CompanyName</th><th>Name</th>\n",
       "                </tr>\n",
       "                <tr><td>Eastside Department Store</td><td>Racing Socks, L</td></tr><tr><td>Essential Bike Works</td><td>Racing Socks, L</td></tr><tr><td>Remarkable Bike Store</td><td>Racing Socks, L</td></tr><tr><td>Remarkable Bike Store</td><td>Racing Socks, M</td></tr><tr><td>Riding Cycles</td><td>Racing Socks, L</td></tr><tr><td>Sports Products Store</td><td>Racing Socks, M</td></tr><tr><td>Sports Products Store</td><td>Racing Socks, L</td></tr><tr><td>The Bicycle Accessories Company</td><td>Racing Socks, L</td></tr><tr><td>The Bicycle Accessories Company</td><td>Racing Socks, M</td></tr><tr><td>Thrifty Parts and Sales</td><td>Racing Socks, M</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(product.join(order_det, \"ProductID\")\n",
    " .join(order_head, \"SalesOrderID\")\n",
    " .join(cust_aw, \"CustomerID\")\n",
    " .join(prod_model.withColumnRenamed(\"Name\", \"ModelName\"), \n",
    "       \"ProductModelID\")\n",
    " .filter(col(\"ModelName\")===\"Racing Socks\")\n",
    " .select(\"CompanyName\", \"Name\")\n",
    " .distinct()\n",
    " .orderBy(\"CompanyName\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 8.\n",
    "Show the product description for culture 'fr' for product with ProductID 736."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>Description</th>\n",
       "                </tr>\n",
       "                <tr><td>Le cadre LL en aluminium offre une conduite confortable, une excellente absorption des bosses pou...</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(product.join(prod_model_prod, \"ProductModelID\")\n",
    " .join(prod_desc, \"ProductDescriptionID\")\n",
    " .filter((col(\"ProductID\")===736) && \n",
    "         (col(\"Culture\").like(\"%fr%\")))\n",
    " .select(\"Description\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 9.\n",
    "Use the SubTotal value in SaleOrderHeader to list orders from the largest to the smallest. For each order show the CompanyName and the SubTotal and the total weight of the order."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>CompanyName</th><th>SubTotal</th><th>sum(Weight)</th>\n",
       "                </tr>\n",
       "                <tr><td>Action Bicycle Specialists</td><td>108561.83</td><td>1133911.56</td></tr><tr><td>Metropolitan Bicycle Supply</td><td>98278.69</td><td>679588.02</td></tr><tr><td>Bulk Discount Store</td><td>88812.86</td><td>34813.05</td></tr><tr><td>Eastside Department Store</td><td>83858.43</td><td>565638.72</td></tr><tr><td>Riding Cycles</td><td>78029.69</td><td>504095.33</td></tr><tr><td>Many Bikes Store</td><td>74058.81</td><td>744328.6</td></tr><tr><td>Instruments and Parts Company</td><td>63980.99</td><td>731576.77</td></tr><tr><td>Extreme Riding Supplies</td><td>57634.63</td><td>589939.11</td></tr><tr><td>Trailblazing Sports</td><td>41622.05</td><td>234328.12</td></tr><tr><td>Professional Sales and Service</td><td>39785.33</td><td>396843.63</td></tr><tr><td>Nearby Cycle Shop</td><td>38418.69</td><td>547260.47</td></tr><tr><td>Closest Bicycle Store</td><td>35775.21</td><td>340144.28</td></tr><tr><td>Thrilling Bike Tours</td><td>13823.71</td><td>191855.76</td></tr><tr><td>Paints and Solvents Company</td><td>12685.89</td><td>122609.42</td></tr><tr><td>Remarkable Bike Store</td><td>6634.3</td><td>45103.5</td></tr><tr><td>Engineered Bike Systems</td><td>3398.17</td><td>37420.66</td></tr><tr><td>Sports Products Store</td><td>3324.28</td><td>53389.08</td></tr><tr><td>Discount Tours</td><td>2980.79</td><td>14977.56</td></tr><tr><td>Sports Store</td><td>2453.76</td><td>38354.65</td></tr><tr><td>Coalition Bike Company</td><td>2415.67</td><td>29183.0</td></tr><tr><td>Aerobic Exercise Company</td><td>2137.23</td><td>6770.44</td></tr><tr><td>Tachometers and Accessories</td><td>2016.34</td><td>10591.33</td></tr><tr><td>Thrifty Parts and Sales</td><td>1141.58</td><td>3175.14</td></tr><tr><td>Vigorous Sports Store</td><td>1059.31</td><td>1043.26</td></tr><tr><td>Good Toys</td><td>880.35</td><td>2050.23</td></tr><tr><td>Transport Bikes</td><td>602.19</td><td>13301.08</td></tr><tr><td>Channel Outlet</td><td>550.39</td><td>0.0</td></tr><tr><td>Futuristic Bikes</td><td>246.74</td><td>0.0</td></tr><tr><td>The Bicycle Accessories Company</td><td>106.54</td><td>0.0</td></tr><tr><td>West Side Mart</td><td>78.81</td><td>317.0</td></tr><tr><td>Essential Bike Works</td><td>40.9</td><td>0.0</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(order_head.join(order_det, \"SalesOrderID\")\n",
    " .join(product, \"ProductID\")\n",
    " .join(cust_aw, \"CustomerID\")\n",
    " .withColumn(\"Weight\", col(\"Weight\") * col(\"OrderQty\"))\n",
    " .na.fill(0, Array(\"Weight\"))\n",
    " .groupBy(\"CompanyName\", \"SubTotal\")\n",
    " .sum(\"Weight\")\n",
    " .withColumn(\"sum(Weight)\", round($\"sum(Weight)\", 2))\n",
    " .orderBy(col(\"SubTotal\").desc)\n",
    " .showHTML()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 10.\n",
    "How many products in ProductCategory 'Cranksets' have been sold to an address in 'London'?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>count</th>\n",
       "                </tr>\n",
       "                <tr><td>2</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(product.join(prod_cat.filter(col(\"name\")===\"Cranksets\"), \n",
    "              \"ProductCategoryID\")\n",
    " .join(order_det, \"ProductID\")\n",
    " .join(order_head, \"SalesOrderID\")\n",
    " .join(cust_aw, \"CustomerID\")\n",
    " .join(cust_addr, \"CustomerID\")\n",
    " .join(addr.filter(addr(\"City\")===\"London\"), \"AddressID\")\n",
    " .groupBy()\n",
    " .count()\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
